# -*- coding: UTF-8 -*-
""" 
@author: hhyo
@license: Apache Licence
@file: archive.py
@time: 2020/01/10
"""
import logging
import os
import re
import traceback
import time

import simplejson as json
from django.conf import settings
from django.contrib.auth.decorators import permission_required
from django.db import transaction, connection, close_old_connections
from django.db.models import Q, Value as V, TextField
from django.db.models.functions import Concat
from django.http import HttpResponse, JsonResponse, HttpResponseRedirect
from django.shortcuts import render
from django.urls import reverse
from django_q.tasks import async_task

from common.utils.const import WorkflowStatus, WorkflowType, WorkflowAction
from common.utils.extend_json_encoder import ExtendJSONEncoder
from common.utils.timer import FuncTimer
from sql.engines import get_engine
from sql.notify import notify_for_audit
from sql.plugins.pt_archiver import PtArchiver
from sql.utils.resource_group import user_instances, user_groups
from sql.models import ArchiveConfig, ArchiveLog, Instance, ResourceGroup
from sql.utils.workflow_audit import get_auditor, AuditException, Audit

logger = logging.getLogger("default")
__author__ = "hhyo"


@permission_required("sql.menu_archive", raise_exception=True)
def archive_list(request):
    """
    获取归档申请列表
    :param request:
    :return:
    """
    user = request.user
    filter_instance_id = request.GET.get("filter_instance_id")
    state = request.GET.get("state")
    limit = int(request.GET.get("limit", 0))
    offset = int(request.GET.get("offset", 0))
    limit = offset + limit
    search = request.GET.get("search", "")

    # 组合筛选项
    filter_dict = dict()
    if filter_instance_id:
        filter_dict["src_instance"] = filter_instance_id
    if state == "true":
        filter_dict["state"] = True
    elif state == "false":
        filter_dict["state"] = False

    # 管理员可以看到全部数据
    if user.is_superuser:
        pass
    # 拥有审核权限、可以查看组内所有工单
    elif user.has_perm("sql.archive_review"):
        # 先获取用户所在资源组列表
        group_list = user_groups(user)
        group_ids = [group.group_id for group in group_list]
        filter_dict["resource_group__in"] = group_ids
    # 其他人只能看到自己提交的工单
    else:
        filter_dict["user_name"] = user.username

    # 过滤组合筛选项
    archive_config = ArchiveConfig.objects.filter(**filter_dict)

    # 过滤搜索项，支持模糊搜索标题、用户
    if search:
        archive_config = archive_config.filter(
            Q(title__icontains=search) | Q(user_display__icontains=search)
        )

    count = archive_config.count()
    lists = archive_config.order_by("-id")[offset:limit].values(
        "id",
        "title",
        "src_instance__instance_name",
        "src_db_name",
        "src_table_name",
        "dest_instance__instance_name",
        "dest_db_name",
        "dest_table_name",
        "sleep",
        "mode",
        "no_delete",
        "status",
        "state",
        "user_display",
        "create_time",
        "resource_group__group_name",
    )

    # QuerySet 序列化
    rows = [row for row in lists]

    result = {"total": count, "rows": rows}
    # 返回查询结果
    return HttpResponse(
        json.dumps(result, cls=ExtendJSONEncoder, bigint_as_string=True),
        content_type="application/json",
    )


@permission_required("sql.archive_apply", raise_exception=True)
def archive_apply(request):
    """申请归档实例数据"""
    user = request.user
    title = request.POST.get("title")
    group_name = request.POST.get("group_name")
    src_instance_name = request.POST.get("src_instance_name")
    src_db_name = request.POST.get("src_db_name")
    src_table_name = request.POST.get("src_table_name")
    mode = request.POST.get("mode")
    dest_instance_name = request.POST.get("dest_instance_name")
    dest_db_name = request.POST.get("dest_db_name")
    dest_table_name = request.POST.get("dest_table_name")
    condition = request.POST.get("condition")
    no_delete = True if request.POST.get("no_delete") == "true" else False
    sleep = request.POST.get("sleep") or 0
    result = {"status": 0, "msg": "ok", "data": {}}

    # 参数校验
    if (
        not all(
            [
                title,
                group_name,
                src_instance_name,
                src_db_name,
                src_table_name,
                mode,
                condition,
            ]
        )
        or no_delete is None
    ):
        return JsonResponse({"status": 1, "msg": "请填写完整！", "data": {}})
    if mode == "dest" and not all([dest_instance_name, dest_db_name, dest_table_name]):
        return JsonResponse(
            {"status": 1, "msg": "归档到实例时目标实例信息必选！", "data": {}}
        )

    # 获取源实例信息
    try:
        s_ins = user_instances(request.user, db_type=["mysql"]).get(
            instance_name=src_instance_name
        )
    except Instance.DoesNotExist:
        return JsonResponse({"status": 1, "msg": "你所在组未关联该实例！", "data": {}})

    # 获取目标实例信息
    if mode == "dest":
        try:
            d_ins = user_instances(request.user, db_type=["mysql"]).get(
                instance_name=dest_instance_name
            )
        except Instance.DoesNotExist:
            return JsonResponse(
                {"status": 1, "msg": "你所在组未关联该实例！", "data": {}}
            )
    else:
        d_ins = None

    # 获取资源组和审批信息
    res_group = ResourceGroup.objects.get(group_name=group_name)
    # 使用事务保持数据一致性
    with transaction.atomic():
        # 保存申请信息到数据库
        archive_info = ArchiveConfig(
            title=title,
            resource_group=res_group,
            audit_auth_groups="",
            src_instance=s_ins,
            src_db_name=src_db_name,
            src_table_name=src_table_name,
            dest_instance=d_ins,
            dest_db_name=dest_db_name,
            dest_table_name=dest_table_name,
            condition=condition,
            mode=mode,
            no_delete=no_delete,
            sleep=sleep,
            status=WorkflowStatus.WAITING,
            state=False,
            user_name=user.username,
            user_display=user.display,
        )
        audit_handler = get_auditor(
            workflow=archive_info,
            resource_group=res_group.group_name,
            resource_group_id=res_group.group_id,
        )

        try:
            audit_handler.create_audit()
        except AuditException as e:
            logger.error(f"新建审批流失败: {str(e)}")
            return JsonResponse(
                {"status": 1, "msg": "新建审批流失败, 请联系管理员", "data": {}}
            )
        audit_handler.workflow.status = audit_handler.audit.current_status
        if audit_handler.audit.current_status == WorkflowStatus.PASSED:
            audit_handler.workflow.state = True
        audit_handler.workflow.save()
        async_task(
            notify_for_audit,
            workflow_audit=audit_handler.audit,
            timeout=60,
            task_name=f"archive-apply-{audit_handler.workflow.id}",
        )
    return JsonResponse(
        {
            "status": 0,
            "msg": "",
            "data": {
                "workflow_status": audit_handler.audit.current_status,
                "audit_id": audit_handler.audit.audit_id,
                "archive_id": audit_handler.workflow.id,
            },
        }
    )


@permission_required("sql.archive_review", raise_exception=True)
def archive_audit(request):
    """
    审核数据归档申请
    :param request:
    :return:
    """
    # 获取用户信息
    archive_id = int(request.POST["archive_id"])
    try:
        audit_status = WorkflowAction(int(request.POST["audit_status"]))
    except ValueError as e:
        return render(
            request,
            "error.html",
            {"errMsg": f"数据错误, 不允许的操作, 请检查 audit_status, error: {str(e)}"},
        )
    audit_remark = request.POST.get("audit_remark")

    if audit_remark is None:
        audit_remark = ""
    try:
        archive_workflow = ArchiveConfig.objects.get(id=archive_id)
    except ArchiveConfig.DoesNotExist:
        return render(request, "error.html", {"errMsg": "工单不存在"})

    resource_group = archive_workflow.resource_group
    auditor = get_auditor(workflow=archive_workflow, resource_group=resource_group)

    # 使用事务保持数据一致性
    with transaction.atomic():
        try:
            workflow_audit_detail = auditor.operate(
                audit_status, request.user, audit_remark
            )
        except AuditException as e:
            return render(request, "error.html", {"errMsg": f"审核失败: {str(e)}"})
        auditor.workflow.status = auditor.audit.current_status
        if auditor.audit.current_status == WorkflowStatus.PASSED:
            auditor.workflow.state = True
        auditor.workflow.save()
    async_task(
        notify_for_audit,
        workflow_audit=auditor.audit,
        workflow_audit_detail=workflow_audit_detail,
        timeout=60,
        task_name=f"archive-audit-{archive_id}",
    )

    return HttpResponseRedirect(reverse("sql:archive_detail", args=(archive_id,)))


def add_archive_task(archive_ids=None):
    """
    添加数据归档异步任务，仅处理有效归档任务
    :param archive_ids: 归档任务id列表
    :return:
    """
    archive_ids = archive_ids or []
    if not isinstance(archive_ids, list):
        archive_ids = list(archive_ids)
    # 没有传archive_id代表全部归档任务统一调度
    if archive_ids:
        archive_cnf_list = ArchiveConfig.objects.filter(
            id__in=archive_ids,
            state=True,
            status=WorkflowStatus.PASSED,
        )
    else:
        archive_cnf_list = ArchiveConfig.objects.filter(
            state=True, status=WorkflowStatus.PASSED
        )

    # 添加task任务
    for archive_info in archive_cnf_list:
        archive_id = archive_info.id
        async_task(
            "sql.archiver.archive",
            archive_id,
            group=f'archive-{time.strftime("%Y-%m-%d %H:%M:%S ")}',
            timeout=-1,
            task_name=f"archive-{archive_id}",
        )


def archive(archive_id):
    """
    执行数据库归档
    :return:
    """
    archive_info = ArchiveConfig.objects.get(id=archive_id)
    s_ins = archive_info.src_instance
    src_db_name = archive_info.src_db_name
    src_table_name = archive_info.src_table_name
    condition = archive_info.condition
    no_delete = archive_info.no_delete
    sleep = archive_info.sleep
    mode = archive_info.mode

    # 获取归档表的字符集信息
    s_engine = get_engine(s_ins)
    s_db = s_engine.schema_object.databases[src_db_name]
    s_tb = s_db.tables[src_table_name]
    s_charset = s_tb.options["charset"].value
    if s_charset is None:
        s_charset = s_db.options["charset"].value

    pt_archiver = PtArchiver()
    # 准备参数
    source = (
        rf"h={s_ins.host},u={s_ins.user},p={s_ins.password},"
        rf"P={s_ins.port},D={src_db_name},t={src_table_name},A={s_charset}"
    )
    args = {
        "no-version-check": True,
        "source": source,
        "where": condition,
        "progress": 5000,
        "statistics": True,
        "charset": "utf8",
        "limit": 10000,
        "txn-size": 1000,
        "sleep": sleep,
    }

    # 归档到目标实例
    if mode == "dest":
        d_ins = archive_info.dest_instance
        dest_db_name = archive_info.dest_db_name
        dest_table_name = archive_info.dest_table_name
        # 目标表的字符集信息
        schema_object = get_engine(d_ins).schema_object
        d_db = schema_object.databases[dest_db_name]
        d_tb = d_db.tables[dest_table_name]
        d_charset = d_tb.options["charset"].value
        if d_charset is None:
            d_charset = d_db.options["charset"].value
        schema_object.connection.close()
        # dest
        dest = (
            rf"h={d_ins.host},u={d_ins.user},p={d_ins.password},P={d_ins.port},"
            rf"D={dest_db_name},t={dest_table_name},A={d_charset}"
        )
        args["dest"] = dest
        if no_delete:
            args["no-delete"] = True
    elif mode == "file":
        output_directory = os.path.join(settings.BASE_DIR, "downloads/archiver")
        os.makedirs(output_directory, exist_ok=True)
        args["file"] = (
            f"{output_directory}/{s_ins.instance_name}-{src_db_name}-{src_table_name}.txt"
        )
        if no_delete:
            args["no-delete"] = True
    elif mode == "purge":
        args["purge"] = True

    # 参数检查
    args_check_result = pt_archiver.check_args(args)
    if args_check_result["status"] == 1:
        return JsonResponse(args_check_result)
    # 参数转换
    cmd_args = pt_archiver.generate_args2cmd(args)
    # 执行命令，获取结果
    select_cnt = 0
    insert_cnt = 0
    delete_cnt = 0
    with FuncTimer() as t:
        p = pt_archiver.execute_cmd(cmd_args)
        stdout = ""
        for line in iter(p.stdout.readline, ""):
            if re.match(r"^SELECT\s(\d+)$", line, re.I):
                select_cnt = re.findall(r"^SELECT\s(\d+)$", line)
            elif re.match(r"^INSERT\s(\d+)$", line, re.I):
                insert_cnt = re.findall(r"^INSERT\s(\d+)$", line)
            elif re.match(r"^DELETE\s(\d+)$", line, re.I):
                delete_cnt = re.findall(r"^DELETE\s(\d+)$", line)
            stdout += f"{line}\n"
    statistics = stdout
    # 获取异常信息
    stderr = p.stderr.read()
    if stderr:
        statistics = stdout + stderr

    # 判断归档结果
    select_cnt = int(select_cnt[0]) if select_cnt else 0
    insert_cnt = int(insert_cnt[0]) if insert_cnt else 0
    delete_cnt = int(delete_cnt[0]) if delete_cnt else 0
    error_info = ""
    success = True
    if stderr:
        error_info = f"命令执行报错:{stderr}"
        success = False
    if mode == "dest":
        # 删除源数据，判断删除数量和写入数量
        if not no_delete and (insert_cnt != delete_cnt):
            error_info = f"删除和写入数量不一致:{insert_cnt}!={delete_cnt}"
            success = False
    elif mode == "file":
        # 删除源数据，判断查询数量和删除数量
        if not no_delete and (select_cnt != delete_cnt):
            error_info = f"查询和删除数量不一致:{select_cnt}!={delete_cnt}"
            success = False
    elif mode == "purge":
        # 直接删除。判断查询数量和删除数量
        if select_cnt != delete_cnt:
            error_info = f"查询和删除数量不一致:{select_cnt}!={delete_cnt}"
            success = False

    # 执行信息保存到数据库
    if connection.connection and not connection.is_usable():
        close_old_connections()
    # 更新最后归档时间
    ArchiveConfig(id=archive_id, last_archive_time=t.end).save(
        update_fields=["last_archive_time"]
    )
    # 替换密码信息后保存
    shell_cmd = " ".join(cmd_args)
    ArchiveLog.objects.create(
        archive=archive_info,
        cmd=(
            shell_cmd.replace(s_ins.password, "***").replace(d_ins.password, "***")
            if mode == "dest"
            else shell_cmd.replace(s_ins.password, "***")
        ),
        condition=condition,
        mode=mode,
        no_delete=no_delete,
        sleep=sleep,
        select_cnt=select_cnt,
        insert_cnt=insert_cnt,
        delete_cnt=delete_cnt,
        statistics=statistics,
        success=success,
        error_info=error_info,
        start_time=t.start,
        end_time=t.end,
    )
    if not success:
        raise Exception(f"{error_info}\n{statistics}")


@permission_required("sql.menu_archive", raise_exception=True)
def archive_log(request):
    """获取归档日志列表"""
    limit = int(request.GET.get("limit", 0))
    offset = int(request.GET.get("offset", 0))
    limit = offset + limit
    archive_id = request.GET.get("archive_id")

    archive_logs = ArchiveLog.objects.filter(archive=archive_id).annotate(
        info=Concat("cmd", V("\n"), "statistics", output_field=TextField())
    )
    count = archive_logs.count()
    lists = archive_logs.order_by("-id")[offset:limit].values(
        "cmd",
        "info",
        "condition",
        "mode",
        "no_delete",
        "select_cnt",
        "insert_cnt",
        "delete_cnt",
        "success",
        "error_info",
        "start_time",
        "end_time",
    )
    # QuerySet 序列化
    rows = [row for row in lists]
    result = {"total": count, "rows": rows}
    # 返回查询结果
    return HttpResponse(
        json.dumps(result, cls=ExtendJSONEncoder, bigint_as_string=True),
        content_type="application/json",
    )


@permission_required("sql.archive_mgt", raise_exception=True)
def archive_switch(request):
    """开启关闭归档任务"""
    archive_id = request.POST.get("archive_id")
    state = True if request.POST.get("state") == "true" else False
    # 更新启用状态
    try:
        ArchiveConfig(id=archive_id, state=state).save(update_fields=["state"])
        return JsonResponse({"status": 0, "msg": "ok", "data": {}})
    except Exception as msg:
        return JsonResponse({"status": 1, "msg": f"{msg}", "data": {}})


@permission_required("sql.archive_mgt", raise_exception=True)
def archive_once(request):
    """单次立即调用归档任务"""
    archive_id = request.GET.get("archive_id")
    async_task(
        "sql.archiver.archive",
        archive_id,
        timeout=-1,
        task_name=f"archive-{archive_id}",
    )
    return JsonResponse({"status": 0, "msg": "ok", "data": {}})
